// Package dispatcher
package dispatcher

import (
	"acs/comet/servernode"
	"acs/eventbroker"
	"encoding/json"
	"strings"
	"sync"
	"time"

	log "github.com/Sirupsen/logrus"
)

type Monitor struct {
	nodes              map[string]*servernode.Node
	lastSuggestedNode  *servernode.Node
	nodesLock          *sync.RWMutex
	regServerEndpoints []string
	monitorRoot        string
	watcher            *eventbroker.Client
}

func NewMonitor(regServerEndpoints []string, authInfoStr string, nodeMonitorRoot string) (*Monitor, error) {
	m := &Monitor{
		regServerEndpoints: regServerEndpoints,
		monitorRoot:        nodeMonitorRoot,
		nodesLock:          &sync.RWMutex{},
		nodes:              make(map[string]*servernode.Node),
	}
	authInfo := strings.Split(authInfoStr, ":")
	brokerCfg := eventbroker.Config{
		Endpoints: regServerEndpoints,
	}
	if len(authInfo) > 1 {
		brokerCfg.Username = authInfo[0]
		brokerCfg.Password = authInfo[1]
	}
	client, err := eventbroker.NewClient(brokerCfg)
	if err != nil {
		return nil, err
	}
	m.watcher = client
	return m, nil
}

func (this *Monitor) Start() {
	this.getAllNodes()
	go this.updateTicker()
	nodeChangeInfoChan := make(chan eventbroker.WatchResponse)
	this.watcher.Watch(this.monitorRoot, nodeChangeInfoChan, true)
	for {
		info := <-nodeChangeInfoChan
		nodeInfo := &servernode.Node{}
		this.nodesLock.Lock()
		if info.Action == "delete" {
			delete(this.nodes, info.Key)
		} else {
			err := json.Unmarshal([]byte(info.Value), nodeInfo)
			if err != nil {
				log.Warnf("Failed to decode node info: %v, origin key/value: [%v]/[%v]", err, info.Key, info.Value)
				continue
			}
			this.nodes[info.Key] = nodeInfo
		}
		this.nodesLock.Unlock()
	}
}

// getAllNodes 一次性获取所有节点信息, 一般只在启动时运行一次, 随后由updateTicker实时更新.
func (this *Monitor) getAllNodes() {
	nodes, err := this.watcher.GetNodes(this.monitorRoot)
	if err != nil {
		log.Warnf("Failed to get all nodes info: %v", err)
	}
	for key, value := range nodes {
		nodeInfo := &servernode.Node{}
		err := json.Unmarshal([]byte(value), nodeInfo)
		if err != nil {
			log.Warnf("Failed to decode node info: %v, origin key/value: [%v]/[%v]", err, key, value)
			continue
		}
		this.nodesLock.Lock()
		this.nodes[key] = nodeInfo
		this.nodesLock.Unlock()
	}
}

// updateTicker upates lastSuggestedNode.
func (this *Monitor) updateTicker() {
	this.updateLastSuggestedNode()
	ticker := time.NewTicker(time.Millisecond * 90)
	for {
		<-ticker.C
		this.updateLastSuggestedNode()
	}
}

func (this *Monitor) updateLastSuggestedNode() {
	this.nodesLock.Lock()
	defer this.nodesLock.Unlock()
	if len(this.nodes) < 1 {
		this.lastSuggestedNode = nil
		return
	}
	// 根据1、5、15分钟的load值算出最小load综合评价系数
	// lowestLoadWeight = loadOne * 100 + loadFive * 20 + loadFifteen * 5
	var lowestLoadWeight float64 = -1
	var lowestLoadWeightNodeKey string

	// 先获取各阶段的最小值
	for k, v := range this.nodes {
		w := v.Info.Load.One*100 + v.Info.Load.Five*20 + v.Info.Load.Fifteen*5
		if lowestLoadWeight > w || lowestLoadWeight == -1 {
			lowestLoadWeight = w
			lowestLoadWeightNodeKey = k
		}
	}
	this.lastSuggestedNode = this.nodes[lowestLoadWeightNodeKey]
}

func (this *Monitor) GetLastSuggestedNodeAddr() string {
	this.nodesLock.RLock()
	defer this.nodesLock.RUnlock()
	if this.lastSuggestedNode == nil {
		return ""
	}
	addr := this.lastSuggestedNode.Ip + ":" + strings.Split(this.lastSuggestedNode.BindAddr, ":")[1]
	return addr
}
